package com.yeskery.nut.util.executor;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;

/**
 * 执行器实现类，如果使用{@link #setExecutorService(ExecutorService)}方法设置线程池，将会在运行结束后自动关闭该线程池
 * @param <T> 执行器数据类型
 * @author sunjay
 * 2024/9/1
 */
public class ExecutorImpl<T> implements Executor<T> {

    /** 日志对象 */
    private static final Logger logger = Logger.getLogger(ExecutorImpl.class.getName());

    /** 任务集合 */
    private final List<Job<T>> jobs = new LinkedList<>();

    /** 数据集合 */
    private final List<T> dataList = new LinkedList<>();

    /** 数据流集合 */
    private final List<Stream<T>> dataStreams = new LinkedList<>();

    /** 执行器上下文 */
    private final ExecutorContext executorContext = new ExecutorContextImpl();

    /** 运行状态 */
    private volatile RunStatus runStatus = RunStatus.NOT_START;

    /** 线程池 */
    private ExecutorService executorService;

    /** 是否在执行器结束时自动关闭该线程池 */
    private boolean autoCloseExecutorService;

    /** 执行器回调 */
    private ExecutorCallback executorCallback;

    @Override
    public void addJob(Job<T> job) {
        checkRunStatus();
        if (jobs.stream().map(Job::getName).anyMatch(r -> r.equals(job.getName()))) {
            throw new ExecuteException("Job Name[" + job.getName() + "] Already Exist.");
        }
        jobs.add(job);
    }

    @Override
    public void addJobs(Collection<Job<T>> jobs) {
        for (Job<T> job : jobs) {
            addJob(job);
        }
    }

    @Override
    public void addDataSource(Collection<T> dataSource) {
        checkRunStatus();
        dataList.addAll(dataSource);
    }

    @Override
    public void addDataSource(Stream<T> dataSource) {
        checkRunStatus();
        dataStreams.add(dataSource);
    }

    @Override
    public RunStatus getRunStatus() {
        return runStatus;
    }

    @Override
    public void setExecutorService(ExecutorService executorService, boolean autoClose) {
        this.executorService = executorService;
        this.autoCloseExecutorService = autoClose;
    }

    @Override
    public void start() {
        checkRunStatus();
        runStatus = RunStatus.RUNNING;
        for (T data : dataList) {
            if (runStatus != RunStatus.RUNNING) {
                break;
            }
            invokeJob(data);
        }
        for (Stream<T> dataStream : dataStreams) {
            dataStream.forEach(data -> {
                if (runStatus != RunStatus.RUNNING) {
                    return;
                }
                invokeJob(data);
            });
        }

        try {
            close();
        } catch (Exception e) {
            throw new ExecuteException("Executor Close Failure.", e);
        }

        try {
            invokeExecutorCallback();
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed To Run ExecutorCallback.", e);
        }
    }

    @Override
    public void setExecutorCallback(ExecutorCallback executorCallback) {
        this.executorCallback = executorCallback;
    }

    public void close() throws IOException {
        runStatus = RunStatus.EXIT;
        if (executorService != null && autoCloseExecutorService) {
            executorService.shutdown();
        }
    }

    /**
     * 检查运行状态
     */
    private void checkRunStatus() {
        if (runStatus == RunStatus.RUNNING) {
            throw new ExecuteException("Executor Already Running.");
        } else if (runStatus == RunStatus.FINISH || runStatus == RunStatus.COMPLETE
                || runStatus == RunStatus.EXIT) {
            throw new ExecuteException("Executor Already Closed.");
        }
    }

    /**
     * 执行任务
     * @param data 数据对象
     */
    private void invokeJob(T data) {
        for (Job<T> job : jobs) {
            try {
                job.setExecutorContext(executorContext);
                if (executorService != null) {
                    executorService.execute(() -> job.run(data));
                } else {
                    job.run(data);
                }
            } catch (Exception e) {
                logger.log(Level.SEVERE, "Failed To Run Job[name="+ job.getName() +"]", e);
            }
        }
    }

    /**
     * 执行回调方法
     */
    private void invokeExecutorCallback() {
        if (executorCallback == null) {
            return;
        }
        if (executorService == null || !autoCloseExecutorService) {
            executorCallback.callback(executorContext);
            return;
        }
        try {
            while (true) {
                if (executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    break;
                }
            }
            executorCallback.callback(executorContext);
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            logger.log(Level.SEVERE, "ExecutorService[" + executorService.getClass().getName()
                    + "] Close Timeout, Callback Method Not Execute.");
        }
    }
}
